RabbitMQ(三)手动Ack确认 您所在的位置:网站首页 rabbit mq 消息确认 RabbitMQ(三)手动Ack确认

RabbitMQ(三)手动Ack确认

2023-08-23 02:26| 来源: 网络整理| 查看: 265

默认情况下 spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除消息的信息。 这样依赖就存在这样一个问题: 如果消费者处理消息需要较长时间,最好的做法是消费端处理完之后手动去确认。 消费者:

@Service("confirmListener") public class ConfirmListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try { String messageStr = new String(message.getBody()); System.out.println("消费者接收到信息 : " + messageStr); if (messageStr.equals("hello")) { //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace();//TODO 业务处理 //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }

生产者及其他配置与上文相同 如果请求是

http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey&message=hello

结果为:在这里插入图片描述 此时队列情况是: 在这里插入图片描述 如果请求是:

http://localhost:8081/test/send?exchange=exchangeTest&key=queueTestKey&message=helloaaa

在这里插入图片描述 helloaaa这个消息没有ack确认 在这里插入图片描述 只要程序还在运行,这1条消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。当程序关闭时(实际只要 Consumer 关闭就行),这1条消息会恢复为 Ready 状态。

这种情况下再次发送hello:没有被监听类接收! 在这里插入图片描述 被没有ack的消息阻塞了!hello这一消息是ready的状态。

找其原因,是否因为prefetch值?

spring-amqp中的prefetch默认值是1。 channel.BasicQos(0, 1, false); 第二个参数是处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。

测试: 在这里插入图片描述 发送两条helloaaa,再发送一条hello: 在这里插入图片描述 最后那条正常的消息不再被阻塞了,成功接收。

另外,消息的确认类型:

channel.basicAck(deliveryTag, multiple); consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。 例如:有值为5,6,7,8 deliveryTag的投递 如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。 如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。

channel.basicNack(deliveryTag, multiple, requeue); consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。 如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。 如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。 如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。 如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。

channel.basicReject(deliveryTag, requeue); 相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。 如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。 如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。

由此可见,手动Ack如果处理方式不对会发生一些问题。 1.没有及时ack,或者程序出现bug,所有的消息将被存在unacked中,消耗内存 如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于 RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。 2.如果使用BasicNack,将消费失败的消息重新塞进队列的头部,则会造成死循环。 (解决basicNack造成的消息循环循环消费的办法是为队列设置“回退队列”,设置回退队列和阀值,如设置队列为q1,阀值为2,则在rollback两次后将消息转入q1)

综上,手动ack需要注意的是: 1.在消费者端一定要进行ack,或者是nack,可以放在try方法块的finally中执行 2.可以对消费者的异常状态进行捕捉,根据异常类型选择ack,或者nack抛弃消息,nack再次尝试 3.对于nack的再次尝试,是进入到队列头的,如果一直是失败的状态,将会造成阻塞。所以最好是专门投递到“死信队列”,具体分析。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有